package xiaoa.java.es.client;

import java.lang.reflect.Field;


import java.lang.reflect.Modifier;
import java.net.InetAddress;
import java.util.Date;
import java.util.List;

import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.transport.client.PreBuiltTransportClient;


import xiaoa.java.log.L;


public class ClientUtils {
	
	// 客户端
	private static TransportClient client;
	
	// 管理 工具类
    private static IndicesAdminClient adminClient;  
    
	/**
	 * 提交文档工具类
	 */
	private static ThreadLocal<BulkProcessor> staticBulkProcessorLocal = new ThreadLocal<>();
	
	
	public static TransportClient getClient(){
		return client;
	}
	
	
	
	
	public static IndicesAdminClient getAdminClient() {
		return adminClient != null ? adminClient : (adminClient = getClient().admin().indices());
	}


	/**
	 * node方式初始化
	 * @Title: initNode
	 * @author xiaoa
	 */
	@SuppressWarnings("resource")
	public static void initClient(String host , int port , String clusterName)throws Exception{
		
		
		Settings settings = Settings.builder().put("client.transport.sniff", false).put("cluster.name", clusterName).build();
		client  = new PreBuiltTransportClient(settings)
                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), port));
		
		
		System.out.println("链接成功");
		initBulk();
		
		System.out.println( "初始化成功");
		
		
	}
	
	
	/**
	 * 增加或者修改索引
	 * @Title: addAndUpIndex
	 * @author xiaoa
	 */
	public static void addOrUpIndex(Class<?> cla , String indexName){

		
	}
	
	/**
	 * 判断index是否存在
	 * @Title: existsIndex
	 * @param index
	 * @return
	 * @author xiaoa
	 */
	public static boolean existsIndex(String index)throws Exception{
	     if (index == null){
	    	 return false;
	     }	
	     
	     IndicesExistsRequest  request = new IndicesExistsRequest("index");
	     return getAdminClient().exists(request).get().isExists();
		
	}
	
	
	/**
	 * 初始化提交文档工具
	 * @Title: initBulk
	 * @author xiaoa
	 */
	public static void initBulk(){
		
	        if (staticBulkProcessorLocal.get() != null){
	        	return ;
	        }
		
		    BulkProcessor   staticBulkProcessor = BulkProcessor.builder(client,  new BulkProcessor.Listener() {

			@Override
			public void beforeBulk(long executionId, BulkRequest request) {
				 //提交前调用
				  System.out.println(new Date().toString() + " before");
				
				
			}

			@Override
			public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
				 System.out.println( "提交" + response.getItems().length + "个文档，用时"
						 + response.getTookInMillis() + "MS" + (response.hasFailures() ? " 有文档提交失败！" : ""));
				 //response.hasFailures();//是否有提交失败
				 //提交结束后调用（无论成功或失败）
			}

			@Override
			public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
				//提交结束且失败时调用
				System.out.println( " 有文档提交失败！after failure=" + failure);
				
			}
			
		}).setBulkActions(1000)//文档数量达到1000时提交
		 .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//总文档体积达到5MB时提交 //
		 .setFlushInterval(TimeValue.timeValueSeconds(1))//每5S提交一次（无论文档数量、体积是否达到阈值）
		 .setConcurrentRequests(1)//加1后为可并行的提交请求数，即设为0代表只可1个请求并行，设为1为2个并行
		 .build();
		 
		 // 设置到本地线程
		 staticBulkProcessorLocal.set(staticBulkProcessor);

	}
	
	
	/**
	 * 刷星
	 * @Title: fushDoc
	 * @author xiaoa
	 */
	public static void fushDoc(){
		
	
		
		getBulk().flush();
		
	}
	
	
	
	/**
	 * 获取提交工具
	 * @Title: getBulk
	 * @return
	 * @author xiaoa
	 */
	public static BulkProcessor getBulk(){
		
		initBulk();
		
		return staticBulkProcessorLocal.get();
	}
	
	
	
	
    public static void testInfo() {
        List<DiscoveryNode> nodes = client.connectedNodes();
        for (DiscoveryNode node : nodes) {
            System.out.println(node.getHostAddress());
        }
    }
    
	
	
    /**
     * 更新文档
     * @Title: update
     * @param index
     * @param type
     * @param id
     * @param bodyMap
     * @throws Exception
     * @author xiaoa
     */
    public static void update(String index , String type , String id , Object obj)throws Exception{
    	
    	
    	XContentBuilder  content =  XContentFactory.jsonBuilder().startObject();
    	
    	Field[] fs = obj.getClass().getFields();
    	
    	
    	for (Field  f :fs){
    		
    		if (Modifier.isStatic(f.getModifiers())){
    			continue;
    		}
    		
    		f.setAccessible(true);
    		
    		if (f.get(obj) == null){
    			continue;
    		}
    		
    		System.out.println("name = " + f.getName() + " value = " + f.get(obj));
    		
    		content.field(f.getName(), f.get(obj) + "");
    	}
    	
    	UpdateRequest updateRequest = new UpdateRequest();
        updateRequest.index(index);
        updateRequest.type(type);
        updateRequest.id(id);
        updateRequest.doc(content.endObject());
        
        System.out.println("updateRequest = " + updateRequest);
        
        UpdateResponse response = client.update(updateRequest).get();
        
        System.out.println(response.toString());
        
    }
    
    
    /**
     * 添加文档
     * @Title: addDoc
     * @param index
     * @param type
     * @param id
     * @param json
     * @author xiaoa
     */
    public static void  addDocToCache(String index , String type ,String id , String json ){
    	
    	IndexRequest  request = new IndexRequest( index , type); 
    	
    	request.source(json , XContentType.JSON);
    	request.id(id);
    	getBulk().add(request);
    	L.info("add: index = " + index + " type = " + type + " id = " + id + " length = " + json.length() );
    	
    }
    
    
    /**
     * 立即刷新
     * @Title: addDoc
     * @param index
     * @param type
     * @param id
     * @param json
     * @author xiaoa
     */
    public static void  addDoc(String index , String type ,String id , String json ){
    	
    	IndexRequestBuilder  request = client.prepareIndex(index , type);
    	request.setSource(json,XContentType.JSON).setId(id);
    	client.prepareBulk().add(request).get();
    	
    }
    
    
	
	public static void main(String[] args)throws Throwable {
		
		initClient("192.168.218.133" , 9300  , "elasticsearch");		
		testInfo();
	
		
		
	}
	

}
